package com.wugui.datax.rpc.remoting.net.impl.netty_http.server;

import com.wugui.datax.rpc.remoting.net.Server;
import com.wugui.datax.rpc.remoting.net.common.NettyConstant;
import com.wugui.datax.rpc.remoting.net.params.Beat;
import com.wugui.datax.rpc.remoting.provider.XxlRpcProviderFactory;
import com.wugui.datax.rpc.util.ThreadPoolUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**
 * netty_http
 *
 * @author xuxueli 2015-11-24 22:25:15
 */
@Slf4j
public class NettyHttpServer extends Server {

  private Thread thread;

  @Override
  public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) {

    thread =
        new Thread(
            () -> {
              // param
              final ThreadPoolExecutor serverHandlerPool =
                  ThreadPoolUtil.makeServerThreadPool(
                      NettyHttpServer.class.getSimpleName(),
                      xxlRpcProviderFactory.getCorePoolSize(),
                      xxlRpcProviderFactory.getMaxPoolSize());
              EventLoopGroup bossGroup = new NioEventLoopGroup();
              EventLoopGroup workerGroup = new NioEventLoopGroup();

              try {
                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(
                        new ChannelInitializer<SocketChannel>() {
                          @Override
                          public void initChannel(SocketChannel channel) {
                            channel
                                .pipeline()
                                .addLast(
                                    new IdleStateHandler(
                                        0,
                                        0,
                                        Beat.BEAT_INTERVAL * 3,
                                        TimeUnit.SECONDS)) // beat 3N, close if idle
                                .addLast(new HttpServerCodec())
                                .addLast(
                                    new HttpObjectAggregator(
                                        NettyConstant
                                            .MAX_LENGTH)) // merge request & reponse to FULL
                                .addLast(
                                    new NettyHttpServerHandler(
                                        xxlRpcProviderFactory, serverHandlerPool));
                          }
                        })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();

                log.info(
                    ">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}",
                    NettyHttpServer.class.getName(),
                    xxlRpcProviderFactory.getPort());
                onStarted();

                // wait util stop
                future.channel().closeFuture().sync();

              } catch (InterruptedException e) {
                if (e instanceof InterruptedException) {
                    log.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
                } else {
                    log.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
                }
              } finally {

                // stop
                try {
                  serverHandlerPool.shutdown(); // shutdownNow
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
                try {
                  workerGroup.shutdownGracefully();
                  bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
              }
            });
    thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
  }

  @Override
  public void stop() {
    // destroy server thread
    if (thread != null && thread.isAlive()) {
      thread.interrupt();
    }

    // on stop
    onStopped();
      log.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
  }
}
